频道栏目
首页 > 程序开发 > 软件开发 > Java > 正文
java提交spark任务到yarn平台的配置讲解
2018-06-23 12:02:27      个评论    来源:weixin_36647532的博客  
收藏   我要投稿

世界杯投注官网

采用spark的方式处理,所以需要将spark的功能集成到世界杯指定投注平台,采用yarn客户端的方式管理spark任务。不需要将cdh的一些配置文件放到resource路径下,只需要配置一些配置即可,非常方便

二、任务管理架构

\

三、接口

1、任务提交

1. /**

2. *提交任务到yarn集群

3. *

4. *@paramconditions

5. *yarn集群,spark,hdfs具体信息,参数等

6. *@returnappid

7. */

8. publicStringsubmitSpark(YarnSubmitConditionsconditions){

9. logger.info("初始化sparkonyarn参数");

10.

11. //初始化yarn客户端

12. logger.info("初始化sparkonyarn客户端");

13. Listargs=Lists.newArrayList("--jar",conditions.getApplicationJar(),"--class",

14. conditions.getMainClass());

15. if(conditions.getOtherArgs()!=null&&conditions.getOtherArgs().size()>0){

16. for(Strings:conditions.getOtherArgs()){

17. args.add("--arg");

18. args.add(org.apache.commons.lang.StringUtils.join(newString[]{s},","));

19. }

20. }

21.

22. //identifythatyouwillbeusingSparkasYARNmode

23. System.setProperty("SPARK_YARN_MODE","true");

24. SparkConfsparkConf=newSparkConf();

25. if(org.apache.commons.lang.StringUtils.isNotEmpty(conditions.getJobName())){

26. sparkConf.setAppName(conditions.getJobName());

27. }

28.

29. sparkConf.set("spark.yarn.jars",conditions.getSparkYarnJars());

30. if(conditions.getAdditionalJars()!=null&&conditions.getAdditionalJars().length>0){

31. sparkConf.set("spark.jars",org.apache.commons.lang.StringUtils.join(conditions.getAdditionalJars(),","));

32. }

33.

34. if(conditions.getFiles()!=null&&conditions.getFiles().length>0){

35. sparkConf.set("spark.files",org.apache.commons.lang.StringUtils.join(conditions.getFiles(),","));

36. }

37. for(Map.Entrye:conditions.getSparkProperties().entrySet()){

38. sparkConf.set(e.getKey().toString(),e.getValue().toString());

39. }

40.

41. //添加这个参数,不然spark会一直请求0.0.0.0:8030,一直重试

42. sparkConf.set("yarn.resourcemanager.hostname",conditions.getYarnResourcemanagerAddress().split(":")[0]);

43. //设置为true,不删除缓存的jar包,因为现在提交yarn任务是使用的世界杯指定投注平台配置,没有配置文件,删除缓存的jar包有问题,

44. sparkConf.set("spark.yarn.preserve.staging.files","true");

45.

46. //初始化yarn的配置

47. Configurationcf=newConfiguration();

48. Stringos=System.getProperty("os.name");

49. booleancross_platform=false;

50. if(os.contains("Windows")){

51. cross_platform=true;

52. }

53. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务

54. //设置yarn资源,不然会使用localhost:8032

55. cf.set("yarn.resourcemanager.address",conditions.getYarnResourcemanagerAddress());

56. //设置namenode的地址,不然jar包会分发,非常恶心

57. cf.set("fs.defaultFS",conditions.getSparkFsDefaultFS());

58.

59. ClientArgumentscArgs=newClientArguments(args.toArray(newString[args.size()]));

60. Clientclient=newClient(cArgs,cf,sparkConf);

61. logger.info("提交任务,任务名称:"+conditions.getJobName());

62.

63. try{

64.

65. ApplicationIdappId=client.submitApplication();

66.

67. returnappId.toString();

68.

69. }catch(Exceptione){

70. logger.error("提交spark任务失败",e);

71. returnnull;

72. }finally{

73. if(client!=null){

74. client.stop();

75. }

76. }

77. }

2、任务进度获取

1. /**

2. *停止spark任务

3. *

4. *@paramyarnResourcemanagerAddress

5. *yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址

6. *@paramappIdStr

7. *需要取消的任务id

8. */

9. publicvoidkillJob(StringyarnResourcemanagerAddress,StringappIdStr){

10. logger.info("取消spark任务,任务id:"+appIdStr);

11. //初始化yarn的配置

12. Configurationcf=newConfiguration();

13. Stringos=System.getProperty("os.name");

14. booleancross_platform=false;

15. if(os.contains("Windows")){

16. cross_platform=true;

17. }

18. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务

19. //设置yarn资源,不然会使用localhost:8032

20. cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);

21.

22. //创建yarn的客户端,此类中有杀死任务的方法

23. YarnClientyarnClient=YarnClient.createYarnClient();

24. //初始化yarn的客户端

25. yarnClient.init(cf);

26. //yarn客户端启动

27. yarnClient.start();

28. try{

29. //根据应用id,杀死应用

30. yarnClient.killApplication(getAppId(appIdStr));

31. }catch(Exceptione){

32. logger.error("取消spark任务失败",e);

33. }

34. //关闭yarn客户端

35. yarnClient.stop();

36.

37. }

3、任务取消

1. /**

2. *获取spark任务状态

3. *

4. *

5. *@paramyarnResourcemanagerAddress

6. *yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址

7. *@paramappIdStr

8. *需要取消的任务id

9. */

10. publicSparkTaskStategetStatus(StringyarnResourcemanagerAddress,StringappIdStr){

11. logger.info("获取任务状态启动,任务id:"+appIdStr);

12. //初始化yarn的配置

13. Configurationcf=newConfiguration();

14. Stringos=System.getProperty("os.name");

15. booleancross_platform=false;

16. if(os.contains("Windows")){

17. cross_platform=true;

18. }

19. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务

20. //设置yarn资源,不然会使用localhost:8032

21. cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);

22. logger.info("获取任务状态,任务id:"+appIdStr);

23.

24. SparkTaskStatetaskState=newSparkTaskState();

25. //设置任务id

26. taskState.setAppId(appIdStr);

27. YarnClientyarnClient=YarnClient.createYarnClient();

28. //初始化yarn的客户端

29. yarnClient.init(cf);

30. //yarn客户端启动

31. yarnClient.start();

32. ApplicationReportreport=null;

33. try{

34. report=yarnClient.getApplicationReport(getAppId(appIdStr));

35. }catch(Exceptione){

36. logger.error("获取spark任务状态失败");

37. }

38.

39. if(report!=null){

40. YarnApplicationStatestate=report.getYarnApplicationState();

41. taskState.setState(state.name());

42. //任务执行进度

43. floatprogress=report.getProgress();

44. taskState.setProgress(progress);

45. //最终状态

46. FinalApplicationStatusstatus=report.getFinalApplicationStatus();

47. taskState.setFinalStatus(status.name());

48. }else{

49. taskState.setState(ConstParam.SPARK_FAILED);

50. taskState.setProgress(0.0f);

51. taskState.setFinalStatus(ConstParam.SPARK_FAILED);

52. }

53.

54. //关闭yarn客户端

55. yarnClient.stop();

56. logger.info("获取任务状态结束,任务状态:"+JSON.toJSONString(taskState));

57. returntaskState;

58. }

四、yarn参数调节

1、可分配给容器的物理内存数量,一个nodemanage分配的内存,如果机器内存是128g,尽量分配2/3

yarn.nodemanager.resource.memory-mb:80g

2、可以为容器分配的虚拟 CPU 内核的数量。该参数在 CDH 4.4 以前版本中无效。一个nodemanage分配的核数。如果机器是64和,尽量分配2/3.

yarn.nodemanager.resource.cpu-vcores:40

3、Java 进程堆栈内存的最大大小(以字节为单位)。已传递到 Java -Xmx。

ResourceManager 的 Java 堆栈大小(字节)

ResourceManager Default Group

B千字节兆字节吉字节

点击复制链接 与好友分享!回本站首页
上一篇:设计模式学习之Command模式——命令也是类
下一篇:Java8 Optional机制的正确使用方式
相关文章
图文推荐
点击排行

关于我们 | 联系我们 | 服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑--致力于做实用的IT技术学习网站